package org.apache.rocketmq.store.delay.startup;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;

import java.io.*;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

public class ServerDelayWrapper   {

    protected static final InternalLogger log = InternalLoggerFactory.getLogger(ServerDelayWrapper.class.getCanonicalName());

    protected MessageStore writeMessageStore;
    private Timer timer;

    private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(2);

    public ServerDelayWrapper(final MessageStore writeMessageStore) {
        this.writeMessageStore = writeMessageStore;
        this.timer = new Timer("ScheduleDealMessageTimerThread", true);
        new File(getDelayPath()).mkdirs();
        this.timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    doTask();
                }catch (Exception ex){
                    log.error("delay scheduleAtFixedRate flush exception", ex);
                }
            }
        },0,1000);

        Thread missCallThread = new Thread(() -> {
            try {
                for (; ; ) {
                    Thread.sleep(10 * 1000);
                    sendMissCallMsg();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        missCallThread.setName("delayProcessor-callback-thread");
        missCallThread.start();
        log.info("init delay success " + getDelayPath());

    }


    public void putMessage(MessageExtBrokerInner msgInner) {
        msgInner.setMsgId("");
        msgInner.setWaitStoreMsgOK(false);
        msgInner.setStartDeliverTime(0);
        msgInner.setDelayTimeLevel(0);
        this.writeMessageStore.putMessage(msgInner);
    }


    public void doTask() {
        try {
            long offset = System.currentTimeMillis();
            long curTime = offset / 1000;
            jobTaskExecute.submit(() -> sendMsg(curTime));
        }catch (Exception ex){
            log.error("task delay error",ex);
        }
    }



    private String getDelayPath() {
        String delayPath = "./delay-store" + File.separator + "delay";
        return delayPath;
    }

    public boolean appendDelayMessage(long startTime, MessageExtBrokerInner msgInner) {

        ObjectOutputStream objectOutputStream = null;
        try {
            String msgId = (startTime / 1000) + "-" + System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999);
            String parentDir = getDelayPath() + File.separator + startTime / 1000;
            File parentFile = new File(parentDir);
            if (!parentFile.exists()) {
                parentFile.mkdirs();
            }
            String fileName = parentDir + File.separator + msgId;

            FileOutputStream fos = new FileOutputStream(fileName);
            BufferedOutputStream bos = new BufferedOutputStream(fos);
            objectOutputStream = new ObjectOutputStream(bos);
            objectOutputStream.writeObject(msgInner);
            return true;
        } catch (Exception ex) {
            log.error("saveMsgFile ex:", ex);
            return false;
        } finally {
            try {
                if (objectOutputStream != null) {
                    objectOutputStream.close();
                }
            } catch (Exception ex) {
                log.error("saveMsgFile ex:", ex);
            }
        }

    }


    private MessageExtBrokerInner readFile(File f) {
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(new FileInputStream(f));
            return (MessageExtBrokerInner) ois.readObject();
        } catch (Exception ex) {
            return null;
        } finally {
            if (ois != null) {
                try {
                    ois.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    private void sendMissCallMsg() {
        File lst = new File(getDelayPath());
        File[] files = lst.listFiles();
        long startTime = System.currentTimeMillis() / 1000 - 10 ;
        for (File f : files) {
            String name = f.getName();
            if (f.isDirectory() && !name.equals(".") && !name.equals("..")) {
                try {
                    Long fileTime = Long.parseLong(name);
                    if (fileTime <= startTime) {
                        sendMsg(fileTime);
                    }
                } catch (Exception ex) {
                    log.error("send miss delay error:{}",ex);
                }
            }
        }

    }

    private String getCurrentTime() {
        return Thread.currentThread().getName() + ">>[" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss") + "] ";
    }

    private void sendMsg(long startTime) {
        File lst = new File(getDelayPath() + File.separator + startTime);
        File[] files = lst.listFiles();
        if (files != null) {
            for (File f : files) {
                MessageExtBrokerInner msgInner = readFile(f);
                if (msgInner != null) {
                    putMessage(msgInner);
                    f.delete();
                }
            }
            lst.delete();
        }
    }
}
